Amazon AthenaのMERGE文でWHEN NOT MATCHED BY SOURCEを再現する

Amazon AthenaのMERGE文でWHEN NOT MATCHED BY SOURCEを再現する

USING句でJOINします
Clock Icon2024.08.05

データ事業本部インテグレーション部機械学習チーム・新納(にいの)です。

Amazon AthenaではIcebergフォーマットでテーブルを作成した場合、条件に応じてデータの挿入(INSERT)、更新(UPDATE)、削除(DELETE)を1つの文で行うMERGE文が利用可能です。

https://docs.aws.amazon.com/ja_jp/athena/latest/ug/merge-into-statement.html

ただし、AthenaでサポートされているMERGE文のWHEN句はWHEN MATCHEDもしくは WHEN NOT MATCHEDのみです。BigQueryでサポートされているようなWHEN NOT MATCHED BY SOURCE(ターゲットテーブルにしかキーが合致するレコードが存在しない場合)はサポートされていません。

https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement

AthenaでもMERGE文の書き方を工夫するとこのWHEN NOT MATCHED BY SOURCEを再現できます。

MERGEの条件

このブログではMERGE文による更新対象のテーブルをターゲットテーブル、結合元のテーブルをソーステーブルと表現します。

Amazon AthenaでサポートされているMERGE文のWHEN句の条件は以下の通りです。

WHEN句 説明
WHEN MATCHED
THEN DELETE
ソーステーブルとターゲットテーブルのキーが一致する場合、レコードを削除
WHEN MATCHED
THEN UPDATE SET ( column = expression [, ...] )
ソーステーブルとターゲットテーブルのキーが一致する場合、レコードを更新する
WHEN NOT MATCHED
THEN INSERT (column_name[, column_name ...]) VALUES (expression, ...)
ソーステーブルに存在するが、ターゲットテーブルに存在しない行の場合、ターゲットテーブルへインサート

全て、BigQueryでいうところのWHEN NOT MATCHED BY TARGETに相当する動作です。ターゲットテーブルに存在するが、ソーステーブルに存在しない行の場合のWHEN NOT MATCHED BY SOURCEはサポートされていません。

AthenaでWHEN NOT MATCHED BY SOURCEを再現する

以下の記事を参考にMERGE文を作成しています。

https://community.snowflake.com/s/article/Workaround-for-WHEN-NOT-MATCHED-BY-SOURCE-THEN-DELETE-feature

Athenaで再現するべく、2種類のテーブルを作成しました。更新対象となるターゲットテーブルは以下の通り。ターゲットテーブルにのみid=3のレコードがあります。

20240802_targrttable1

結合元のソーステーブルは以下の通り。

20240802_sourcetable1

今回はWHEN NOT MATCHED BY SOURCE THEN DELETE(ターゲットテーブルに存在するが、ソーステーブルに存在しない行を削除する)を再現します。

USING句の中でサブクエリを使用し、以下の条件を記述します。

  • ターゲット・ソーステーブルをMERGEキーでLEFT JOIN
  • マージ条件に一致したレコード(ソーステーブルに存在しないターゲットテーブルのレコード)を削除
MERGE INTO target_iceberg_table USING (
	SELECT target_iceberg_table.id
	FROM target_iceberg_table
		LEFT JOIN source_iceberg_table ON target_iceberg_table.id = source_iceberg_table.id
	WHERE source_iceberg_table.id is null
) s ON s.id = target_iceberg_table.id
WHEN matched THEN DELETE;

実行すると、更新対象とするターゲットテーブルはソーステーブルの中身と同一になり、ソーステーブルに存在しないがターゲットテーブルに存在する行は削除されていることがわかりました。

20240802_targrttable1_result

一つのMERGE文で更新・挿入・削除を実現する

USING句でサブクエリを使う方法でWHEN NOT MATCHED BY SOURCE THEN DELETEを実現できましたが、このままでは一つのMERGE文で削除しかできません。一度に更新・挿入・削除を実現してみましょう。

今回のターゲットテーブルは以下の通り。

20240802_targrttable2

ソーステーブルは以下の通り。

20240802_sourcetable2

このテーブルに対して、以下の処理を実現します。

  • マージキーにマッチしたレコード→UPDATE
    • 上記テーブルのid=2
  • ソーステーブルのみにの存在するレコード→INSERT
    • ソーステーブルのid=4
  • ターゲットテーブルのみに存在するレコード→DELETE
    • ターゲットテーブルのid=3

なんだかややこしいですが、図にするとこんな感じです。

20240802_tar_src_overview

このテーブルに対して以下のMERGE文を実行します。

ポイントは以下の通り。

  • USING句内のサブクエリでソーステーブルとターゲットテーブルを結合
  • 各レコードに対して'UPDATE'、'INSERT'、'DELETE'のアクションを決定し、WHEN MATCHEDの条件として指定する方法があります。
MERGE INTO target_iceberg_table
USING (
    SELECT s.*,
        CASE
            WHEN t.id IS NULL THEN 'INSERT' ELSE 'UPDATE'
        END AS merge_action
    FROM source_iceberg_table s
        LEFT JOIN target_iceberg_table t ON s.id = t.id
    UNION ALL
    SELECT t.*,
        'DELETE' AS merge_action
    FROM target_iceberg_table t
        LEFT JOIN source_iceberg_table s ON t.id = s.id
    WHERE s.id IS NULL
) merged_data
ON target_iceberg_table.id = merged_data.id
WHEN MATCHED
AND merged_data.merge_action = 'UPDATE' 
THEN UPDATE SET
    name = merged_data.name,
    record_time = merged_data.record_time
WHEN MATCHED
AND merged_data.merge_action = 'DELETE' 
THEN DELETE
WHEN NOT MATCHED THEN
INSERT (id, name, record_time)
VALUES (
        merged_data.id,
        merged_data.name,
        merged_data.record_time
    )

実行後、期待通りソーステーブルとターゲットテーブルのレコードが同一となりました。

20240802_target2result

最後に

Amazon AthenaでWHEN NOT MATCHED BY SOURCEに相当する動作を再現してみました。USING句でソーステーブルとターゲットテーブルをJOINしている関係上、データ量が多いとパフォーマンスに影響が出る恐れがあるのでご留意ください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.